package com.xj.zookeeper.framework;

import org.apache.log4j.Logger;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.Watcher.Event;

import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;

/**
 * Author: xiajun
 * Date: 2014-08-07
 * Time: 18:06:00
 */
public class WatcherProcess {
    private final Logger log = Logger.getLogger(WatcherProcess.class);
    private static final ConcurrentHashMap<String, ListenerManager> clientListenPool = new ConcurrentHashMap<String, ListenerManager>();//节点监听池
    private static final ConcurrentHashMap<String, ListenerManager> dataListenPool = new ConcurrentHashMap<String, ListenerManager>();//数据监听池
    private static final ConcurrentHashMap<String, Node> stabilizeNodePool = new ConcurrentHashMap<String, Node>();//坚固的节点，服务重链后节点自动注册
    private ZkClient zkClient;

    public WatcherProcess(ZkClient zooKeeper) {
        this.zkClient = zooKeeper;
    }

    /**
     * 处理值节点变化
     *
     * @param path
     */
    public void clientNodeChanged(String path) {
        try {
            if (clientListenPool.containsKey(path)) {
                List<String> childs = this.zkClient.getChild(path, true);
                ListenerManager manager = clientListenPool.get(path);
                diff(childs, manager);
                manager.setClientNode(childs);
                ListenerProcessPool.invoker(manager);
            }
        } catch (Exception e) {
            log.error("get childeren node error.path = " + path, e);
        }
    }

    /**
     * 处理节点数据变化
     *
     * @param path
     */
    public void nodeDataChanged(String path) {
        try {
            if (dataListenPool.containsKey(path)) {
                byte[] data = this.zkClient.getData(path, true);
                ListenerManager manage = dataListenPool.get(path);
                manage.setData(data);
                manage.setEventType(Event.EventType.NodeDataChanged);
                ListenerProcessPool.invoker(manage);
            }
        } catch (Exception e) {
            log.error("get childeren node error.path=" + path, e);
        }
    }

    /**
     * 设置监听对象，当监听的节点的子节点发生变化时将回调listen()方法
     *
     * @param node
     * @param listenClient ture 为监听子节点变化，false为监听本节点数据变化
     * @throws KeeperException
     * @throws InterruptedException
     */
    public void listen(Listener node, boolean listenClient, boolean instantly) throws ZkException {
        if (!zkClient.exists(node.getNode())) {
            throw new NullPointerException("listen path " + node.getNode() + "  not found.");
        }
        try {
            ListenerManager manager = new ListenerManager(node);
            if (listenClient) {
                clientListenPool.put(node.getNode(), manager);
                if (instantly) {
                    clientNodeChanged(node.getNode());
                } else {
                    manager.setClientNode(zkClient.getChild(node.getNode(), true));
                }
            } else {
                dataListenPool.put(node.getNode(), manager);
                if (listenClient) {
                    nodeDataChanged(node.getNode());
                } else {
                    zkClient.getData(node.getNode(), true);
                }
            }
        } catch (Exception e) {
            throw new ZkException("listen node " + node.getNode(), e);
        }
    }

    /**
     * 取消监听
     *
     * @param nodePath
     * @param listenClient
     * @throws ZkException
     */
    public void unlisten(String nodePath, boolean listenClient) throws ZkException {
        if (!zkClient.exists(nodePath)) {
            throw new NullPointerException("listen path " + nodePath + "  not found.");
        }
        if (listenClient) {
            zkClient.getChild(nodePath, false);
            clientListenPool.remove(nodePath);
        } else {
            zkClient.getData(nodePath, false);
            dataListenPool.remove(nodePath);
        }
    }

    /**
     * 添加稳定的临时节点，当session过期重连后自动创建
     *
     * @param node
     */
    public void setStabilizeNode(Node node) {
        WatcherProcess.stabilizeNodePool.put(node.getPath(), node);
    }

    /**
     * session过期后重新设置监听
     */
    public void relisten() throws ZkException {
        for (Map.Entry<String, ListenerManager> entry : clientListenPool.entrySet()) {
            String node = entry.getKey();
            this.zkClient.getChild(node, true);
            log.info("relisten to node [" + node + "] => clientNode");
        }
        for (Map.Entry<String, ListenerManager> entry : dataListenPool.entrySet()) {
            String node = entry.getKey();
            this.zkClient.getData(node, true);
            log.info("relisten to node [" + node + "] => dataNode");
        }
    }

    /**
     * session过期，从连后创建指定节点
     *
     * @throws ZkException
     */
    public void recreate() throws ZkException {
        for (Map.Entry<String, Node> entry : stabilizeNodePool.entrySet()) {
            if (!zkClient.exists(entry.getKey())) {
                Node node = entry.getValue();
                this.zkClient.create(node.getPath(), node.getData(), CreateMode.EPHEMERAL);
                log.info("重新创建节点[" + node.getPath() + "].");
            }
        }
    }

    private ListenerManager diff(List<String> client, ListenerManager opt) {
        List<String> oldClient = opt.getClientNode();
        int newSize = client == null ? 0 : client.size();
        boolean isAdd = newSize > oldClient.size() ? true : false;
        if (isAdd) {
            for (int i = newSize - 1; i >= 0; i--) {
                String c = client.get(i);
                if (!oldClient.contains(c)) {
                    opt.setClient(c);
                    opt.setEventType(Event.EventType.NodeCreated);
                    break;
                }
            }
        } else {
            for (String s : oldClient) {
                if (client == null || client.isEmpty()) {
                    opt.setClient(s);
                    opt.setEventType(Event.EventType.NodeDeleted);
                    break;
                }
                if (!client.contains(s)) {
                    opt.setClient(s);
                    opt.setEventType(Event.EventType.NodeDeleted);
                    break;
                }
            }
        }
        return opt;
    }
}
